Create DataFrame for CSV file

Spark SQL provides spark.read.csv("path") to read a CSV file into Spark DataFrame and dataframe.write.csv("path") to save or write to CSV file Using
  • spark.read.csv("path")
  • spark.read.format("csv").load("path") 
you can read a CSV file into a Spark DataFrame.This method takes a file path to read as an argument.

val df = spark.read.csv("/FileStore/tables/real_state.csv")
val df = spark.read.format("csv").load("/FileStore/tables/real_state.csv")
df.show(5)

 
Default schema for CSV file
It reads all columns as a string (StringType) by default.
df.printSchema


Options while reading CSV file
Spark CSV dataset provides multiple options to work with CSV files, all these options
  • header
  • delimiter
  • InferSchema
  • quotes
  • nullValues
  • dateFormat
Header
This option is used to read the first line of the CSV file as column names. By default the value of this option is false , and all column types are assumed to be a string.

val df = spark.read.options(Map("header"->"true")).csv("/FileStore/tables/real_state.csv")
val df = spark.read.option("header","true").csv("/FileStore/tables/real_state.csv")
val df = spark.read.format("csv").option("header","true").load("/FileStore/tables/real_state.csv")
df.show(5)


Delimiter :- delimiter option is used to specify the column delimiter of the CSV file. By default, it is comma (,) character, but can be set to any character us this option.
val df = spark.read.options(Map("header"->"true","delimiter"->",")).csv("/FileStore/tables/real_state.csv") 
val df = spark.read.option("header","true").option("delimiter",",").csv("/FileStore/tables/real_state.csv")
val df = spark.read.option("header","true").option("sep",",").csv("/FileStore/tables/real_state.csv")
val df = spark.read.format("csv").option("header","true").option("delimiter",",").load("/FileStore/tables/real_state.csv")
df.show(5)


inferSchema:- The default value set to this option is false, when set to true it automatically infer column types based on the data. It requires to read the data one more time to infer the schema.
val df = spark.read.options(Map("header"->"true","delimiter"->",","inferSchema"->"true")).csv("/FileStore/tables/real_state.csv") 
val df = spark.read.option("header","true").option("delimiter",",").option("inferSchema","true").csv("/FileStore/tables/real_state.csv")
val df = spark.read.format("csv").option("header","true").option("delimiter",",").option("inferSchema","true").load("/FileStore/tables/real_state.csv")
df.show(5)


nullValues:- Using nullValues option you can specify the string in a CSV to consider as null. For example, if you want to consider a date column with a value “1900-01-01” set null on DataFrame.


val df = spark.read.format("csv")
            .option("header","true")
            .option("inferSchema","true")
            .option("nullValue","NA")
      .option("timestampFormat","yyyy-MM-dd'T'HH:mm?:ss")                   .option("mode","failfast")
            .option("path","file:///home/training/data/csv/emp.csv").load()

df.write.format("parquet").mode("overwrite").save("file:///home/training/data/output/parquet")

val df = spark.read.format("csv")
             .option("sep",";")
             .option("inferSchema","true")
             .option("header", "true")
             .load("data/people.csv")

Method-2
 val df = spark.read.options( Map("header" -> "true",
                "inferSchema" -> "true",
                "nullValue" -> "NA",
                "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
                "mode" -> "failfast")
                ).csv("/FileStore/tables/real_state.csv")

While creating the DataFrame, we specified the option to infer schema using: option("inferSchema", "true")  This essentially instructs Spark to automatically infer the data type for each column when reading the CSV file.
 
Read multiple CSV files
val df = spark.read.csv("file1,file2,file3")

Read all CSV files in a directory
val df = spark.read.csv("Folder path")  

How to handel Malformed Records
if you get a malformed record. That's a natural condition. When you are reading hundreds of thousands of rows, some of them may not confine to the schema. Those are incorrect records. Spark DataFrameReader allows you to set an option called mode. You can set one of the three values. 
  • permissive
  • dropMalformed
  • failFast

The default value is permissive. If you set the mode to permissive and encounter a malformed record, the DataFrameReader will set all the column values to null and push the entire row into a string column called _corrupt_record. This corrupt record column allows you to dump those records in a separate file. You can investigate and fix those records at the later stage. The dropMalformed option will quietly drop the malformed records. You won't even notice that some rows were not accurate.And the failFast method raises an exception.  

 

No comments:

Post a Comment